package DataStreamApi.Transformation算子.物理分区算子;

import domain.WaterSensor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;
import java.util.List;

public class Flink05_自定义分区 {


    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        List<WaterSensor> waterSensorList = Arrays.asList(
                new WaterSensor("s1", 1L, 1),
                new WaterSensor("s2", 2L, 2),
                new WaterSensor("s2", 3L, 3),
                new WaterSensor("s3", 1L, 4),
                new WaterSensor("s1", 11L, 5),
                new WaterSensor("s1", 2L, 6));
        executionEnvironment.setParallelism(6);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = executionEnvironment.fromCollection(waterSensorList);


        DataStream<WaterSensor> waterSensorDataStream = waterSensorDataStreamSource.partitionCustom(new Flink05_Partitioner(), new WaterSensor().getId());

        

        executionEnvironment.execute();

    }
}
